-
Notifications
You must be signed in to change notification settings - Fork 101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-22459 Implement zone Raft group listener #5134
base: main
Are you sure you want to change the base?
Conversation
|
||
CompletableFuture<?>[] futures = zoneTables.stream() | ||
.map(tbl -> { | ||
CompletableFuture<Void> createStoragesFuture = runAsync( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reviewer: this change is needed, because there was a bug: getOrCreatePartitionStorages
returns a CompletableFuture
, which we wrapped in a runAsync
having a double wrapped future, which means we didn't wait for the nested future to complete
bcb1d1b
to
e830657
Compare
|
||
clo.result(new TransactionResult(cmd.commit() ? COMMITTED : ABORTED, cmd.commitTimestamp())); | ||
} else if (command instanceof PrimaryReplicaChangeCommand) { | ||
// This is a hack for tests, this command is not issued in production because no zone-wide placement driver exists yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need a TODO here to remove the hack later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know which ticket to use here and I don't expect us to miss this place, because it will likely not work properly when we will implement the placement driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a ticket, added a TODO
|
||
processTableSpecificCommand(tablePartitionId, clo); | ||
} else { | ||
LOG.info("Message type " + command.getClass() + " is not supported by the zone partition RAFT listener yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a WARN?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from the previous implementation and this is temporary code anyway, this will become an assertion eventually
return Collections.singleton(value).iterator(); | ||
} | ||
|
||
private static CommandClosure<WriteCommand> idempotentCommandClosure(CommandClosure<WriteCommand> clo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this is a 'no-result-propagating' closure. Why is it called idempotent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is that you can call result
many times and it will not affect the state of the closure) What do you propose?
e4d3fce
to
277fda6
Compare
277fda6
to
e53d829
Compare
...ain/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
Show resolved
Hide resolved
} | ||
|
||
private void processTableSpecificCommand(TablePartitionId tablePartitionId, CommandClosure<WriteCommand> clo) { | ||
tablePartitionRaftListeners.get(tablePartitionId).onWrite(singletonIterator(clo)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that we may batch consecutive closures if they belong to the same table in the original zone closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please elaborate? Do you mean, that instead of singletonIterator
we could use an iterator with multiple commands, grouped by partition ID? But what's the point in this, all listeners just call forEach
inside anyway. To save on map.get
calls? Or do you mean we can parallelize the processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do you mean we can parallelize the processing?
I though about that and it's not trivial, though useful optimisation applicable in case of !full transactions. So, not for now.
To save on map.get calls?
Yep, plus iterators instantiations. If we do know, that consecutive list of command in iterator belong to same table we may batch them.
OriginalIterator: zoneCmd0, tbl1Cmd1, tbl1Cmd2, tbl2Cmd3, tbl1Cmd4, tbl1Cmd5, tbl1Cmd6
tbl1Cmd1, tbl1Cmd2 may be "converted" to iterator 1, tbl2Cmd3 to iterator 2, tbl1Cmd4, tbl1Cmd5, tbl1Cmd6 to iterator 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OriginalIterator: zoneCmd0, tbl1Cmd1, tbl1Cmd2, tbl2Cmd3, tbl1Cmd4, tbl1Cmd5, tbl1Cmd6
tbl1Cmd1, tbl1Cmd2 may be "converted" to iterator 1, tbl2Cmd3 to iterator 2, tbl1Cmd4, tbl1Cmd5, tbl1Cmd6 to iterator 3.
Yep, I thought of the same thing. Ok, I'll try to implement it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried, it's really too much code, I can do it in a separate ticket. Also, I don't think it's worth to make the code more complex right now
|
||
clo.result(new TransactionResult(cmd.commit() ? COMMITTED : ABORTED, cmd.commitTimestamp())); | ||
} else if (command instanceof PrimaryReplicaChangeCommand) { | ||
// This is a hack for tests, this command is not issued in production because no zone-wide placement driver exists yet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need it only in order for implicit transactions to work? The ones that you've introduced in ItZoneDataReplicationTest.java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly for single puts, but it makes implicit transactions work as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, please remove it for now. It'll be covered with another ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about single puts, how am I supposed to test them?
synchronized (commitedConfigurationLock) { | ||
currentCommitedConfiguration = config; | ||
|
||
tablePartitionRaftListeners.values().forEach(listener -> listener.onConfigurationCommitted(config)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious whether it's reasonable to parallelise this. For now, it'd left it simple though.
@@ -124,7 +124,7 @@ public class PartitionListener implements RaftGroupListener { | |||
|
|||
private final UUID localNodeId; | |||
|
|||
private Set<String> currentGroupTopology; | |||
private volatile Set<String> currentGroupTopology; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Is it possible to touch it without raft synchronisation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little tricky. Previously, this variable was only written in onConfigurationCommitted
and read in onWrite
, which means it was only accessed by the single Raft write thread (onConfigurationCommitted
is called from this thread, right?). Now we have a new entry point: onConfigurationCommitted
can be called by the aggregate zone-level listener, when we add a new table-level listener to it, which can happen in an arbitrary thread. I think we can still avoid volatile
here, because:
- If
onConfigurationCommitted
was called from the Raft thread, then the next read would be from the Raft thread, so no synchronization is needed; - If
onConfigurationCommitted
is called when we are adding a new listener, then there will be aput
in aConcurrentMap
, followed by aget
from the same map, when callingonWrite
.
However, these guarantees seem too brittle to me and I'm not sure I'm not missing something here (we are still reading through a race and may be overlooking some side effects). So I decided to have a volatile
here, to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, sounds reasonable. Could you please corresponding comment in code?
...n/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
Show resolved
Hide resolved
@@ -262,70 +257,6 @@ private void stopNode(int idx) { | |||
nodes.remove(idx); | |||
} | |||
|
|||
@Test | |||
public void testZoneReplicaListener(TestInfo testInfo) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it removed? It's not the equivalent of testReplicationOnAllNodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is it different? We insert data and then read it back from the primary replica...
Also, note that this test uses single-puts =)
KeyValueView<Integer, Integer> kvView2 = node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class, Integer.class); | ||
|
||
// Test single insert. | ||
kvView1.put(null, 42, 69); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use runInTransaction instead. Full transactions aren't supported yet, and the way you've mocked PrimaryReplicaChangeCommand along with preudo working single puts brings more mess. It'll be a definition of done for PrimaryReplicaChangeCommand for single puts to work properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
along with preudo working single puts brings more mess
But I still need working single puts, what's wrong with implicit transactions? I can add tests with explicit transactions as well, that makes more sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I still need working single puts
Why?
My point here is that you don't need implicit transactions in order to test your ticket. Implicit transactions will properly work after we will add proper PrimaryReplicaChangeCommand that on it's turn will happen after zones PD introduction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I still need working single puts
Why?
Because I need to test that both UpdateCommand
and UpdateAllCommand
are replicated. How can I do that without single puts?
My point here is that you don't need implicit transactions in order to test your ticket. Implicit transactions will properly work after we will add proper PrimaryReplicaChangeCommand that on it's turn will happen after zones PD introduction.
I agree, current PrimaryReplicaChangeCommand
is needed mainly for the single-put support.
...tionTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java
Show resolved
Hide resolved
@@ -210,6 +212,8 @@ public class PartitionReplicaLifecycleManager extends | |||
/** Configuration of rebalance retries delay. */ | |||
private final SystemDistributedConfigurationPropertyHolder<Integer> rebalanceRetryDelayConfiguration; | |||
|
|||
private final ConcurrentMap<ZonePartitionId, ZonePartitionRaftListener> zonePartitionRaftListeners = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an encapsulation leek, we should have raft listeners inside replicas. Why do you need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is somewhat of a encapsulation leek, but I think that code like this is even worse:
var zonePartitionReplicaListener = (ZonePartitionReplicaListener) replicaFut.join().listener();
zonePartitionReplicaListener.addTableReplicaListener(tablePartitionId, tablePartitionReplicaListenerFactory);
In my case I would also have to add special methods to access these listeners. Moreover, the lifecycle of raft and replica listeners is closely related - they are always added and removed together, so I would even prefer to have both of them in this map, this will make much easier to manage them. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather add something like addTableProcessor method to the replica in order to substitute
((ZonePartitionReplicaListener) replicaFut.join().listener()).addTableReplicaListener(tablePartitionId, createListener);
with something like replicaFut.join().addTableProcessor(tablePartitionId, createListener, tableRaftListener)
in PartitionReplicaLifecycleManager#loadTableListenerToZoneReplica
addTableReplicaListener
should be removed in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. But this is not possible with the current code state: if we move this logic inside Replica
, then we need to be able to access ZonePartitionReplicaListener
inside it, but this class is in another module, which is not accessible. We could overcome this by introducing something like ZoneReplica
and place it in the correct module, but this will also mean we will need to have a ZoneReplicaManager
and so on
https://issues.apache.org/jira/browse/IGNITE-22459
Thank you for submitting the pull request.
To streamline the review process of the patch and ensure better code quality
we ask both an author and a reviewer to verify the following:
The Review Checklist
- There is a single JIRA ticket related to the pull request.
- The web-link to the pull request is attached to the JIRA ticket.
- The JIRA ticket has the Patch Available state.
- The description of the JIRA ticket explains WHAT was made, WHY and HOW.
- The pull request title is treated as the final commit message. The following pattern must be used: IGNITE-XXXX Change summary where XXXX - number of JIRA issue.
Notes